// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "range_arena_rebuild.h"

#include "flush_manager.h"
#include "range_arena.h"

namespace rocksdb {
MultiRangeArena::MultiRangeArena(PureMemRep* table, Logger* info_log)
    : pmem_rep_(table), info_log_(info_log), event_manager_(new MultiRangeManager(this)) {
  // create first memblock
  char* start_key = new char[2];
  start_key[0] = 0;
  start_key[1] = 0;
  Slice start(start_key, 2);
  Slice end(new char(255), 1);
  auto first_arena = new RangeArena(
      kMemBlockMaxSize / 2,
      kMemBlockMaxSize * (kMemBlockGrowPer - kMemBlockReservedPer) / 200, start,
      end, nullptr, info_log);
  RangeArenaAdd(start, first_arena);
  first_arena->ChangeStatus(MemBlockState::MEMORY_BLOCK_OK);
  range_arenas_.Push(first_arena);
}

void MultiRangeArena::AllocateOK(const Slice& key, const uint32_t encoded_len,
                                 const char* handle, void* rangearena) {
  assert(rangearena != nullptr);
  auto current_rangearena = static_cast<RangeArena *>(rangearena);
  current_rangearena->AddData2OKList(encoded_len, handle);
}

// memblock allocate
char* MultiRangeArena::Allocate(size_t bytes, const Slice& userkey, void* node,
                                void** rangearena) {
  char* addr = nullptr;
  while (addr == nullptr) {
    RangeArena* current_range_arena = SearchBlock(userkey);
    assert(current_range_arena != nullptr);
    *rangearena = current_range_arena;

    // Call the Allocate function to allocate the corresponding number of bytes
    // of memory for the inserted data in the memory block
    bool raIsFull = current_range_arena->Allocate(bytes, node, &addr);
    if (raIsFull && current_range_arena->IsNormal()){
        if (current_range_arena->ChangeStatusOK2Handling()){
          event_manager_->AcceptEvent(current_range_arena, ARENA_EVENT_HANDLE);
        }
    }
  }
  return addr;
}

void* MultiRangeArena::Delete(const Slice& key, size_t len, void* rangearena, void* node) {
  auto* current_range_arena = static_cast<RangeArena*>(rangearena);
  if (current_range_arena == nullptr || !current_range_arena->IsNormal()
      ||current_range_arena->CurrentBlockKeyEnd().compare(key) <= 0) {
    current_range_arena = SearchBlock(key);
  }

  assert(node != nullptr);
  assert(current_range_arena != nullptr);
  size_t curDelSize = current_range_arena->IncreaseCurrentDeleteSize(len);
  AtomicLinkedList<void *>& nodePushList = current_range_arena->NodePush();
  nodePushList.insertHead(node);


  if (current_range_arena->IsNormal() &&
      curDelSize > current_range_arena->CurrentMemBlockMaxSize() * kMemBlockDeletedPer){
    // memblock cut event
    if(current_range_arena->ChangeStatusOK2Handling())
      event_manager_->AcceptEvent(current_range_arena, ARENA_EVENT_HANDLE);
  }
  return current_range_arena;
}

// insert the userkey and adress of memblock into the skiplist
// userkey: startkey of，current memblock  addr:head address of current memblock
void MultiRangeArena::RangeArenaAdd(const Slice& userKey, RangeArena* addr) {
  auto key_size = (uint32_t)(userKey.size());
  uint32_t val_size = 8;
  const uint32_t encoded_len = VarintLength(key_size) + key_size + val_size;
  // key + value(8-bit pointer)
  // handle keycompare
  KeyHandle handle = skip_list_.AllocateKey(encoded_len);
  char* buf = static_cast<char*>(handle);
  char* p = EncodeVarint32(buf, key_size);
  memcpy(p, userKey.data(), key_size);
  p += key_size;
  memcpy(p, &addr, val_size);
  // write the assembled key into the skiplist
  bool ok = skip_list_.Insert((char*)handle);
  assert(ok);
}

// According to userkey, compare the startkey of each memory block in the jump
// table and query the memory block address
// userkey:new data
RangeArena* MultiRangeArena::SearchBlock(const Slice& userkey) {
  InlineSkipList<const MemTableRep::KeyComparator&>::Iterator iter(
      &this->skip_list_);
  int len = userkey.size();
  uint32_t keySize = VarintLength(len) + len;
  char ret[keySize];
  char* p = EncodeVarint32(ret, len);
  memcpy(p, userkey.data(), userkey.size());

  iter.SeekForPrev(ret);
  if (!iter.Valid()) {
    ROCKS_LOG_INFO(info_log_, "memblock is null");
    return nullptr;
  }

  if (iter.Valid()) {
    Slice handle = GetLengthPrefixedSlice(iter.key());
    const char* val_addr = handle.data() + handle.size();
    RangeArena* addr = *(RangeArena**)(val_addr);
    return addr;
  }

  return nullptr;
}

// Modify the memory block address corresponding to userkey in the skiplist
// userkey: startkey of，current memblock
// newaddr:head address of current memblock
void MultiRangeArena::RangeArenaChange(const Slice& userkey, RangeArena* new_addr) {
  int len = userkey.size();
  uint32_t keySize = VarintLength(len) + len;
  char ret[keySize];
  memset(ret, '\0', keySize);
  char* p = EncodeVarint32(ret, len);
  memcpy(p, userkey.data(), userkey.size());
  InlineSkipList<const MemTableRep::KeyComparator&>::Iterator iter(
      &this->skip_list_);
  iter.Seek(ret);

  assert(iter.Valid());
  Slice handle = GetLengthPrefixedSlice(iter.key());
  Slice val = Slice(handle.data() + handle.size(), 8);
  memcpy(const_cast<char*>(val.data()), &new_addr, 8);

}

void MultiRangeArena::RangeArenaClear(){
  InlineSkipList<const MemTableRep::KeyComparator&>::Iterator iter(
      &this->skip_list_);
  iter.SeekToFirst();
  while(iter.Valid()){
    Slice handle = GetLengthPrefixedSlice(iter.key());
    const char* val_addr = handle.data() + handle.size();
    RangeArena* addr = *(RangeArena**)(val_addr);
    delete addr;
    iter.Next();
  }
}

}  // namespace rocksdb